/*
 * Copyright 2024 The PhoenixOS Authors. All rights reserved.
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <iostream>
#include <vector>
#include <algorithm>
#include <string>
#include <map>
#include <set>
#include <unordered_map>
#include <type_traits>
#include <thread>
#include <future>
#include <atomic>
#include <filesystem>
#include <stdint.h>
#include <assert.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <sys/stat.h>
#include "pos/include/common.h"
#include "pos/include/log.h"
#include "pos/include/utils/lockfree_queue.h"
#include "pos/include/checkpoint.h"
#include "pos/include/metrics.h"


#define kPOS_HandleDefaultSize   (1<<4)


/*!
 *  \brief  idx of base resource types
 */
enum : pos_resource_typeid_t {
    kPOS_ResourceTypeId_Unknown = 0,
    kPOS_ResourceTypeId_Num_Base_Type
};


/*!
 *  \brief  status of a handle instance
 */
enum pos_handle_status_t : uint8_t {
    /* ========= Status of Handle Existance ========= */
    /*!
     *  \brief  the resource behind this handle is active 
     *          on the XPU device, if an op rely on this 
     *          handle, it's ok to launch
     */
    kPOS_HandleStatus_Active = 0,

    /*!
     *  \brief  the resource behind this handle has been
     *          released manually by the client
     *  \note   this status is marked under worker function
     */
    kPOS_HandleStatus_Deleted,

    /*!
     *  \brief  the resource behind this handle are going
     *          to be deleted
     *  \note   this status is marked under parser function
     *  \note   once the handle is marked as this status in
     *          the parser function, subsequent parser
     *          function won't obtain this handle under
     *          get_handle_by_client_addr
     *  \note   it's ok for collect_broken_handles to skip
     *          such handle, as they still active currently
     *          (will be deleted under subsequent op)   
     */
    kPOS_HandleStatus_Delete_Pending,

    /*!
     *  \brief  the resource behind this handle is pending
     *          to be created on XPU
     */
    kPOS_HandleStatus_Create_Pending,

    /*!
     *  \brief  the resource behind this handle is broken
     *          on the XPU device, one need to restore the
     *          resource before launching any op that rely
     *          on it
     */
    kPOS_HandleStatus_Broken,

    /* ========= Status of Handle State ========= */
    /*!
     *  \brief  the state of resource behind this handle is
     *          ready on the XPU device
     */
    kPOS_HandleStatus_StateReady,

    /*!
     *  \brief  the state of resource behind this handle is
     *          missing on the XPU device, one need to reload
     *          the state before executing ops rely on it
     */
    kPOS_HandleStatus_StateMiss,
};


// forward declaration
template<class T_POSHandle>
class POSHandleManager;


// generated by protobuf
namespace pos_protobuf { class Bin_POSHandle; }
namespace google { namespace protobuf { class Message; } }


// this map is implemented in specific backend platform
extern std::map<pos_resource_typeid_t,std::string> pos_resource_map;


/*!
 *  \brief  a mapping of client-side and server-side handle, along with its metadata
 */
class POSHandle {
 public:
    /*!
     *  \param  client_addr_    the mocked client-side address of the handle
     *  \param  size_           size of the handle it self
     *  \param  hm              handle manager which this handle belongs to
     *  \param  id_             index of this handle in the handle manager list
     *  \param  state_size_     size of the resource state behind this handle
     *  \note   this constructor is for software resource, whose client-side address
     *          and server-side address could be seperated
     */
    POSHandle(
        void *client_addr_, size_t size_, void* hm, pos_u64id_t id_, size_t state_size_=0
    ) : client_addr(client_addr_),
        server_addr(nullptr),
        size(size_),
        id(id_),
        resource_type_id(kPOS_ResourceTypeId_Unknown),
        status(kPOS_HandleStatus_Create_Pending),
        state_status(kPOS_HandleStatus_StateReady), 
        state_size(state_size_),
        latest_version(0),
        ckpt_bag(nullptr),
        _hm(hm),
        _persist_thread(nullptr),
        _persist_promise(nullptr)
    {
        this->_state_preserve_counter.store(0);
    }


    /*!
     *  \param  size_           size of the resources represented by this handle
     *  \param  hm              handle manager which this handle belongs to
     *  \param  id_             index of this handle in the handle manager list
     *  \param  state_size_     size of the resource state behind this handle
     *  \note   this constructor is for hardware resource, whose client-side address
     *          and server-side address should be equal (e.g., memory)
     */
    POSHandle(
        size_t size_, void* hm, pos_u64id_t id_, size_t state_size_=0
    ) : client_addr(nullptr),
        server_addr(nullptr),
        size(size_),
        id(id_),
        resource_type_id(kPOS_ResourceTypeId_Unknown),
        status(kPOS_HandleStatus_Create_Pending),
        state_status(kPOS_HandleStatus_StateReady),
        state_size(state_size_),
        latest_version(0),
        ckpt_bag(nullptr),
        _hm(hm),
        _persist_thread(nullptr),
        _persist_promise(nullptr)
    {
        this->_state_preserve_counter.store(0);
    }


    /*!
     *  \param  hm  handle manager which this handle belongs to
     *  \note   this constructor is invoked during restore process, where the content of 
     *          the handle will be resume by deserializing from checkpoint binary
     */
    POSHandle(
        void* hm
    ) : id(0),
        resource_type_id(kPOS_ResourceTypeId_Unknown),
        status(kPOS_HandleStatus_Create_Pending),
        state_status(kPOS_HandleStatus_StateMiss),
        client_addr(nullptr),
        server_addr(nullptr),
        size(0),
        state_size(0),
        latest_version(0),
        ckpt_bag(nullptr),
        _persist_thread(nullptr),
        _persist_promise(nullptr),
        _hm(hm)
    {
        this->_state_preserve_counter.store(0);
    }


    ~POSHandle() = default;


    /* ====================== basic handle management ======================== */
 public:
    // index of this handle in the handle list of handle manager
    pos_u64id_t id;

    /*!
    *  \brief  the typeid of the resource kind which this handle represents
    *  \note   the children class of this base class should replace this value
    *          with their own typeid
    */
    pos_resource_typeid_t resource_type_id;


    // exitance and state status of the resource behind this handle
    pos_handle_status_t status;
    pos_handle_status_t state_status;

    // the mocked client-side address of the handle
    void *client_addr;

    // the actual server-side address of the handle
    void *server_addr;
    
    // remote sserver address on the backup device
    void *remote_server_addr;

    /*!
     *  \brief    size of the resources represented by this handle
     *  \example  the size of the buffer represented by current handler 
     *            (i.e., a device memory pointer)
     *  \note     for some handles (e.g., cudaStream_t), this value should remain
     *            constant —— kPOS_HandleDefaultSize
     */
    size_t size;

    /*!
     *  \brief  size of the resource state behind this handle
     */
    size_t state_size;

    /*!
     *  \brief  latest modified version of this handle
     *  \note   this field should be updated after the succesful execution of API within worker thread
     *          (and the API inout/output this handle)
     */
    pos_u64id_t latest_version;
    
    /*!
     *  \brief  identify whether current handle is the latest used handle in the manager
     *  \note   this field is only used during restore phrase
     */
    bool is_lastest_used_handle;


    /*!
     *  \brief  setting the server-side address of the handle after finishing allocation
     *  \param  addr  the server-side address of the handle
     */
    inline void set_server_addr(void *addr){ server_addr = addr; }


    /*!
     *  \brief  setting both the client-side and server-side address of the handle 
     *          after finishing allocation
     *  \param  addr        the setting address of the handle
     *  \param  handle_ptr  pointer to current handle
     *  \return POS_SUCCESS for successfully setting
     *          POS_FAILED_ALREADY_EXIST for duplication failed;
     */
    pos_retval_t set_passthrough_addr(void *addr, POSHandle* handle_ptr);


    /*!
     *  \brief  identify whether a given address is located within the resource
     *          that current handle represents
     *  \param  addr    the given address
     *  \param  offset  pointer to store the offset of the given address from the base
     *                  address, if the given address is located within the resource
     *                  that current handle represents
     *  \return identify result
     */
    bool is_client_addr_in_range(void *addr, uint64_t *offset=nullptr);


    /*!
     *  \brief  mark the status of this handle
     *  \param  status the status to mark
     *  \note   this function would call the inner function within the corresponding handle manager
     */
    void mark_status(pos_handle_status_t status);


    /*!
     *  \brief  mark the status of state of the resource behind this handle
     *  \param  status the state status to mark
     */
    inline void mark_state_status(pos_handle_status_t status){
        POS_ASSERT(status == kPOS_HandleStatus_StateReady || status == kPOS_HandleStatus_StateMiss);
        this->state_status = status;
    }


    /*!
     *  \brief  obtain the resource name begind this handle
     *  \return resource name begind this handle
     */
    virtual std::string get_resource_name(){ return std::string("unknown"); }


    /*!
     *  \brief  tear down the resource behind this handle, recycle it back to handle manager
     *  \note   this function is invoked when a client is dumped, and posd should tear down all resources
     *          it allocates on GPU
     *  \return POS_SUCCESS for successfully tear down
     */
    virtual pos_retval_t tear_down(){ return POS_FAILED_NOT_IMPLEMENTED; }
    /* ====================== basic handle management ======================== */


    /* ===================== parent handles management ======================= */
 public:
    /*!
     *  \brief  pointer to the instance of parent handle
     *  \note   this field is inited by parser thread, updated by worker thread
     *  \note   we seperate the init and create owner, as the __restore function
     *          of POSHandle could be executed in worker thread, where parent_handles
     *          would be accessed; so when we need to change parent handles, we here
     *          use the worker to update this parent_handles
     */
    std::vector<POSHandle*> parent_handles;

    /*!
     *  \brief  resource type and handle indices of parent handles of this handle
     *  \note   this field is filled during restore process, for temporily store the indices
     *          of all parent handles of this handle
     */
    std::vector<std::pair<pos_resource_typeid_t, pos_u64id_t>> parent_handles_waitlist;


    /*!
     *  \brief  record a new parent handle of current handle
     */
    inline void record_parent_handle(POSHandle* parent){
        POS_CHECK_POINTER(parent); parent_handles.push_back(parent);
    }


    /*!
     *  \brief  wrapper map to store broken handles
     */
    typedef struct pos_broken_handle_list {
        /*!
         *  \brief  list of broken handles
         *  \note   outter index: layer id
         */
        std::vector<std::vector<POSHandle*>*> _broken_handles;
        inline uint16_t get_nb_layers(){ return _broken_handles.size(); }

        /*!
         *  \brief  add new broken handle to the map
         *  \param  layer_id    index of the layer that this broken handle locates in
         *  \param  handle      pointer to the broken handle
         */
        inline void add_handle(uint16_t layer_id, POSHandle* handle){
            std::vector<POSHandle*> *vec;

            while(layer_id >= _broken_handles.size()){
                POS_CHECK_POINTER(vec = new std::vector<POSHandle*>());
                _broken_handles.push_back(vec);
            }

            _broken_handles[layer_id]->push_back(handle);
        }

        /*!
         *  \brief  reset this map (i.e., clear all recorded broken handles)
         */
        inline void reset(){
            uint16_t i;
            for(i=0; i<_broken_handles.size(); i++){
                if(likely(_broken_handles[i] != nullptr)){ _broken_handles[i]->clear(); }
            }
        }

        /*!
         *  \brief  repeatly call this function to traverse the current list
         *  \param  layer_id_keeper     keeping the intermedia traverse layer id
         *                              [default value should be the return value of get_nb_layers()]
         *  \param  handle_id_keeper    keeping the intermedia traverse handle id within the layer
         *                              [default value should be 0]
         *  \return non-nullptr for the obtained handle; nullptr for reaching the end of traversing
         */
        inline POSHandle* reverse_get_handle(uint16_t& layer_id_keeper, uint64_t& handle_id_keeper){
            POSHandle *retval = nullptr;
            
            POS_CHECK_POINTER(_broken_handles[layer_id_keeper]);

            if(unlikely(handle_id_keeper >= _broken_handles[layer_id_keeper]->size())){
                if(layer_id_keeper == 0){
                    goto exit;
                } else {
                    layer_id_keeper -= 1;
                    handle_id_keeper = 0;
                }
            }

            POS_CHECK_POINTER(_broken_handles[layer_id_keeper]);

            if(_broken_handles[layer_id_keeper]->size() > 0)
                retval = (*(_broken_handles[layer_id_keeper]))[handle_id_keeper];
            
            handle_id_keeper += 1;

        exit:
            return retval;
        }

        /*!
         *  \brief  deconstructor
         */
        ~pos_broken_handle_list(){
            uint16_t i;
            for(i=0; i<_broken_handles.size(); i++){
                if(likely(_broken_handles[i] != nullptr))
                   delete _broken_handles[i];
            }
        }
    } pos_broken_handle_list_t;


    /*!
     *  \brief  collect all broken handles along the handle trees
     *  \note   this function will call recursively, aware of performance issue!
     *  \param  broken_handle_list  list of broken handles, 
     *  \param  layer_id            index of the layer at this call
     */
    void collect_broken_handles(pos_broken_handle_list_t *broken_handle_list, uint16_t layer_id = 0);
    /* ===================== parent handles management ======================= */



    /* ==================== checkpoint add/commit/persist ==================== */
 public:
    /*!
     *  \brief  bag of checkpoints, implemented by different ckpt optimization level
     *  \note   it must be initialized by different implementations of stateful handle,
     *          as they might require different allocators and deallocators, see function
     *          __init_ckpt_bag
     */
    POSCheckpointBag *ckpt_bag;


    /*!
     *  \brief  reset the state preserve counter to zero, to start a new checkpoint round
     */
    void reset_preserve_counter();


    /*!
     *  \brief  add the state of the resource behind this handle to another on-device resource syncly
     *  \note   only handle of stateful resource should implement this method
     *  \note   this function should be called at the worker thread
     *  \param  version_id  version of this checkpoint
     *  \param  stream_id   index of the stream to do this checkpoint
     *  \return POS_SUCCESS for successfully added
     */
    pos_retval_t checkpoint_add(uint64_t version_id, uint64_t stream_id=0);


    /*!
     *  \brief  commit the device-side state of the resource behind this handle
     *  \note   only handle of stateful resource should implement this method
     *  \note   this function should be called at the worker thread
     *  \param  version_id  version of this checkpoint
     *  \param  stream_id   index of the stream to do this checkpoint
     *  \return POS_SUCCESS for successfully commited
     */
    pos_retval_t checkpoint_commit_async(uint64_t version_id, uint64_t stream_id=0);


    /*!
     *  \brief  checkpoint the state of the resource behind this handle (sync)
     *  \note   only handle of stateful resource should implement this method
     *  \note   this function should be called at the worker thread
     *  \param  version_id  version of this checkpoint
     *  \param  stream_id   index of the stream to do this checkpoint
     *  \return POS_SUCCESS for successfully checkpointed
     */
    pos_retval_t checkpoint_commit_sync(uint64_t version_id, uint64_t stream_id=0);


    /*!
     *  \brief  commit the host-side state of the resource behind this handle
     *  \note   this function should be called at the parser thread
     *  \note   only one satisfy the following conditions should the state commit via this interface:
     *          1.  state comes from host to device
     *          2.  there's no way to checkpoint state from device directly, so we need to record when it
     *              comes down from host
     *          example: GPU Modules
     *  \note   DON'T adopt this function on those resources that can retreive state from device (e.g., 
     *          GPU memory), as it will downgrade the performance of parser thread
     *  \param  version_id  version of this checkpoint
     *  \param  data        pointer to the host-side state to be commited
     *  \param  size        size of the host-side state to be commited
     *  \return POS_SUCCESS for successfully commited
     */
    pos_retval_t checkpoint_commit_host(uint64_t version_id, void* data, uint64_t size);


    /*!
     *  \brief  asynchronously persist this handle
     *  \param  ckpt_dir            directory to store checkpoint files
     *  \param  with_state          whether to persist with state
     *  \param  version_id          version of checkpoint to be persisted, if with_state is true
     *  \return POS_SUCCESS for successfully persisting  
     */
    pos_retval_t checkpoint_persist_async(std::string ckpt_dir, bool with_state, uint64_t version_id);


    /*!
     *  \brief  synchronously persist this handle
     *  \note   this function is for tracing system which need checkpoint without state
     *  \param  ckpt_dir            directory to store checkpoint files
     *  \param  with_state          whether to persist with state
     *  \param  version_id          version of checkpoint to be persisted, if with_state is true
     *  \return POS_SUCCESS for successfully persisting  
     */
    pos_retval_t checkpoint_persist_sync(std::string ckpt_dir, bool with_state, uint64_t version_id);


    /*!
     *  \brief  synchronize the persisting process
     *  \return POS_SUCCESS for successfully persist
     */
    pos_retval_t sync_persist();


 protected:
    // counter for exclude copy-on-write and checkpoint process
    std::atomic<uint8_t> _state_preserve_counter;

    // thread to persist checkpoint of the current handle
    std::thread *_persist_thread;
    std::promise<pos_retval_t> *_persist_promise;


    /*!
     *  \brief  add the state of the resource behind this handle to on-device memory
     *  \param  version_id  version of this checkpoint
     *  \param  stream_id   index of the stream to do this checkpoint
     *  \note   the add process must be sync
     *  \return POS_SUCCESS for successfully checkpointed
     */
    virtual pos_retval_t __add(uint64_t version_id, uint64_t stream_id=0){ 
        return POS_FAILED_NOT_IMPLEMENTED;
    }


    /*!
     *  \brief  commit the device-side state of the resource behind this handle
     *  \param  version_id  version of this checkpoint
     *  \param  stream_id   index of the stream to do this checkpoint
     *  \param  from_cow    whether to dump from on-device cow buffer
     *  \param  is_sync     whether the commit process should be sync
     *  \return POS_SUCCESS for successfully checkpointed
     */
    virtual pos_retval_t __commit(uint64_t version_id, uint64_t stream_id=0, bool from_cow=false, bool is_sync=false){
        // only stateful handle should rewrite this function
        POS_DEBUG("%s shouldn't called __commit function", this->get_resource_name().c_str());
        return POS_FAILED_NOT_IMPLEMENTED;
    }


    /*!
     *  \brief  obtain the checkpoint slot with corresponding version index for persist
     *  \param  ckpt_slot   obtained checkpoint slot
     *  \param  version_id  given version index
     *  \return POS_SUCCESS for successful get
     */
    virtual pos_retval_t __get_checkpoint_slot_for_persist(POSCheckpointSlot** ckpt_slot, uint64_t version_id){
        return POS_FAILED_NOT_IMPLEMENTED;
    }


    /*!
     *  \brief  generate protobuf message for this handle
     *  \param  binary      pointer to the generated binary
     *  \param  base_binary pointer to the base field inside the binary
     *  \return POS_SUCCESS for succesfully generation
     */
    virtual pos_retval_t __generate_protobuf_binary(google::protobuf::Message** binary, google::protobuf::Message** base_binary){
        return POS_FAILED_NOT_IMPLEMENTED;
    }


 private:
    /*!
     *  \brief  async thread to persist the checkpoint to file system
     *  \param  ckpt_slot   the checkopoint slot which stores the host-side checkpoint
     *  \param  ckpt_dir    directory to store the checkpoint
     *  \return POS_SUCCESS for successfully persist
     */
    pos_retval_t __persist_async_thread(POSCheckpointSlot* ckpt_slot, std::string ckpt_dir);
    /* ==================== checkpoint add/commit/persist ==================== */


    /* ======================== restore handle & state ======================= */
 public:
    /*!
     *  \brief  restore the current handle when it becomes broken status
     *  \return POS_SUCCESS for successfully restore
     */
    pos_retval_t restore();
    

    /*!
     *  \brief  reload the state behind current handle to the device
     *  \param  stream_id       stream for reloading the state
     *  \return POS_SUCCESS for successfully restore
     */
    pos_retval_t reload_state(uint64_t stream_id=0);


    /*!
     *  \note   binary area that mmap the checkpoint file of this handle,
     *          this field is used during restore phrase
     */
    void* restore_binary_mapped;
    uint64_t restore_binary_mapped_size;

    /*!
     *  \brief  restore the current handle when it becomes broken status
     *  \note   implemented by specific handle type
     *  \return POS_SUCCESS for successfully restore
     */
    virtual pos_retval_t __restore(){ 
        return POS_FAILED_NOT_IMPLEMENTED;
    }


    /*!
     *  \brief  reload state of this handle back to the device
     *  \note   implemented by specific handle type
     *  \param  mapped          mmap area of the checkpoint file of this handle
     *  \param  ckpt_file_size  size of the checkpoint size (mmap area)
     *  \param  stream_id       stream for reloading the state
     */
    virtual pos_retval_t __reload_state(void* mapped, uint64_t ckpt_file_size, uint64_t stream_id){
        return POS_FAILED_NOT_IMPLEMENTED;
    }
    /* ======================== restore handle & state ======================= */



    /* ===================== platform-specific functions ===================== */
 protected:
    /*!
     *  \brief  synchronize a specific device stream
     *  \param  stream_id   index of the stream to be synchronized
     *  \return POS_SUCCESS for successfully synchronizing
     */
    virtual pos_retval_t __sync_stream(uint64_t stream_id=0){ return POS_FAILED_NOT_IMPLEMENTED; }
    /* ===================== platform-specific functions ===================== */


 protected:
    /*!
     *  \note   the belonging handle manager
     */
    void *_hm;


    /*!
     *  \brief  initialize checkpoint bag of this handle
     *  \note   it must be implemented by different implementations of stateful 
     *          handle, as they might require different allocators and deallocators
     *  \return POS_SUCCESS for successfully initialization
     */
    virtual pos_retval_t __init_ckpt_bag(){ return POS_FAILED_NOT_IMPLEMENTED; }
};


/*!
 *  \brief   manager for handles of a specific kind of resource
 *  \tparam  T_POSHandle  specific handle class for the resource
 */
template<class T_POSHandle>
class POSHandleManager {
    /* ================================ basic ================================ */
 public:
    // range of the mocked client-side address
    #define kPOS_ResourceBaseAddr   0x555500000000
    #define kPOS_ResourceEndAddr    0xFFFFFFFFFFF0

    /*!
     *  \brief      last-used handle
     *  \example    for some handle manager (e.g., CUDA device, cuBLAS), one need to record the last-used handle 
     *              for later usage (e.g., cudaGetDevice, cublasSetStream)
     */
    T_POSHandle* latest_used_handle;


    /*!
     *  \brief    default handle to use
     *  \example  for some handle manager (e.g., CUDA stream), one need to record the default handle
     */
    T_POSHandle* default_handle;


    /*!
     *  \brief  constructor
     *  \param  passthrough indicate whether the handle's client-side and server-side address
     *                      are equal (true for hardware resource, false for software resource)
     */
    POSHandleManager(bool passthrough = false)
        : _base_ptr(kPOS_ResourceBaseAddr), _passthrough(passthrough), _rid(kPOS_ResourceTypeId_Unknown) {}


    ~POSHandleManager() = default;

    /*!
     *  \brief  initialize of the handle manager
     *  \note   pre-allocation of handles, e.g., default stream, device, context handles
     *  \param  related_handles related handles to allocate new handles in this manager
     *  \param  is_restoring    is_restoring    identify whether we're restoring a client, if it's, 
     *                          we won't initialize initial handles inside each 
     *                          handle manager
     *  \return POS_SUCCESS for successfully allocation
     */
    virtual pos_retval_t init(std::map<uint64_t, std::vector<POSHandle*>> related_handles, bool is_restoring){
        return POS_FAILED_NOT_IMPLEMENTED;
    }


    /*!
     *  \brief  allocate new mocked resource within the manager
     *  \param  handle              pointer to the mocked handle of the newly allocated resource
     *  \param  related_handles     all related handles for helping allocate the mocked resource
     *                              (note: these related handles might be other types)
     *  \param  size                size of the newly allocated resource
     *  \param  use_expected_addr   indicate whether to use expected client-side address
     *  \param  expected_addr       the expected mock addr to allocate the resource (optional)
     *  \param  state_size          size of resource state behind this handle  
     *  \return POS_FAILED_DRAIN for run out of virtual address space; 
     *          POS_SUCCESS for successfully allocation
     */
    virtual pos_retval_t allocate_mocked_resource(
        T_POSHandle** handle, 
        std::map<uint64_t, std::vector<POSHandle*>> related_handles,
        size_t size = kPOS_HandleDefaultSize,
        bool use_expected_addr = false,
        uint64_t expected_addr = 0,
        uint64_t state_size = 0
    );

    /*!
     *  \brief  list of handles managed by this manager (including those removed ones)
     */
    std::vector<T_POSHandle*> _handles;

    
    // resource type id of this handle manager
    pos_resource_typeid_t _rid;


    /*!
     *  \brief  allocate new mocked resource within the manager
     *  \param  handle              pointer to the mocked handle of the newly allocated resource
     *  \param  size                size of the newly allocated resource
     *  \param  use_expected_addr   whether to use expected client-side address
     *  \param  expected_addr       the expected mock addr to allocate the resource (optional)
     *  \note   this function should be internally invoked by allocate_mocked_resource, which leave 
     *          to children class to implement
     *  \return POS_FAILED_DRAIN for run out of virtual address space; 
     *          POS_SUCCESS for successfully allocation
     */
    pos_retval_t __allocate_mocked_resource(
        T_POSHandle** handle,
        size_t size=kPOS_HandleDefaultSize, 
        bool use_expected_addr = false, 
        uint64_t expected_addr=0,
        uint64_t state_size = 0
    );


 private:
    std::unordered_map<uint64_t, T_POSHandle*> _deleted_handle_address_map;
    /* ================================ basic ================================ */


    /* =========================== metric system ============================= */
 public:
    #if POS_CONF_RUNTIME_EnableTrace
        virtual void print_metrics(){}
    #endif
    /* =========================== metric system ============================= */



    /* =========================== handle getter ============================= */
 public:
    /*!
     *  \brief  obtain a handle by given client-side address
     *  \param  client_addr the given client-side address
     *  \param  handle      the resulted handle
     *  \param  offset      pointer to store the offset of the given address from the base address
     *  \return POS_FAILED_NOT_EXIST for no corresponding handle exists;
     *          POS_SUCCESS for successfully founded
     */
    virtual pos_retval_t get_handle_by_client_addr(void* client_addr, T_POSHandle** handle, uint64_t* offset=nullptr);


    /*!
     *  \brief  obtain the number of recorded handles
     *  \return the number of recorded handles
     */
    inline uint64_t get_nb_handles(){ return this->_handles.size(); }


    /*!
     *  \brief  obtain all handles
     *  \note   aware of thread safety when use this function
     *  \return the list of handles
     */
    inline std::vector<T_POSHandle*> get_handles(){ 
        /* here is a snapshot of current handle list */
        return this->_handles;
    }


    /*!
     *  \brief  obtain a handle by given index
     *  \param  id  the specified index
     *  \return pointer to the founed handle or nullptr
     */
    inline T_POSHandle* get_handle_by_id(uint64_t id){
        if(unlikely(id >= this->get_nb_handles())){
            return nullptr;
        } else {
            return this->_handles[id];
        }
    }


 protected:
    /*!
     *  \brief  obtain a handle by given client-side address
     *  \param  client_addr the given client-side address
     *  \param  handle      the resulted handle
     *  \param  offset      pointer to store the offset of the given address from the base address
     *  \note   this function should be internally invoked by get_handle_by_client_addr, which leave 
     *          to children class to implement
     *  \return POS_FAILED_NOT_EXIST for no corresponding handle exists;
     *          POS_SUCCESS for successfully founded
     */
    pos_retval_t __get_handle_by_client_addr(void* client_addr, T_POSHandle** handle, uint64_t* offset=nullptr);
    /* =========================== handle getter ============================= */



    /* ======================== address management =========================== */
 public:
    /*!
     *  \brief  record handle address to the address map
     *  \note   this function should be called right after a handle obtain its client-side address:
     *          (1) for non-passthrough handle: called within __allocate_mocked_resource;
     *          (2) for passthrough handle: called within handle->set_server_addr
     *  \param  addr    client-side address of the handle
     *  \param  handle  the handle to be recorded
     *  \return POS_SUCCESS for successfully recorded;
     *          POS_FAILED_ALREADY_EXIST for duplication failed
     */
    inline pos_retval_t record_handle_address(void* addr, T_POSHandle* handle){
        pos_retval_t retval = POS_SUCCESS;
        T_POSHandle *__tmp;
        uint64_t addr_u64 = (uint64_t)(addr);

        POS_CHECK_POINTER(handle);

        if(likely(POS_FAILED_NOT_EXIST == __get_handle_by_client_addr(addr, &__tmp))){
            _handle_address_map[addr_u64] = handle;
        } else {
            POS_CHECK_POINTER(__tmp);

            /*!
             *  \note   no need to be failed here, some handle will record duplicated resources on purpose, 
             *          e.g., CUFunction
             */
            // POS_WARN_C(
            //     "try to record duplicated handle to the manager: new_addr(%p), new_size(%lu), old_addr(%p), old_size(%lu)",
            //     addr, handle->size, __tmp->client_addr, __tmp->size
            // );
            // retval = POS_FAILED_ALREADY_EXIST;
        }

    // exit:
        return retval;
    }

 protected:
    uint64_t _base_ptr;
    
    /*!
     *  \brief  indicate whether the handle's client-side and server-side address are 
     *          equal (true for hardware resource, false for software resource)
     */
    bool _passthrough;


 private:
    std::map<uint64_t, T_POSHandle*> _handle_address_map;
    /* ======================== address management =========================== */


    /* ======================== incremental support ========================== */
 public:
    /*!
     *  \brief  record a new handle that will be modified
     *  \param  handle  the handle that will be modified
     */
    inline void record_modified_handle(T_POSHandle* handle){
        POS_CHECK_POINTER(handle);
        _modified_handles.insert(handle);
    }


    /*!
     *  \brief  clear all records of modified handles
     */
    inline void clear_modified_handle(){ 
        _modified_handles.clear();
    }


    /*!
     *  \brief  get all records of modified handles
     *  \return all records of modified handles
     */
    inline std::set<T_POSHandle*>& get_modified_handles(){
        return _modified_handles;
    }


 protected:
    /*!
     *  \brief  this map records all modified buffers since last checkpoint, 
     *          will be updated during parsing, and cleared during launching
     *          checkpointing op
     */
    std::set<T_POSHandle*> _modified_handles;
    /* ======================== incremental support ========================== */


    /* ===================== handle status management ======================== */
 public:
    inline pos_retval_t mark_handle_status(T_POSHandle *handle, pos_handle_status_t status){
        pos_retval_t retval = POS_SUCCESS;
        typename std::map<uint64_t, T_POSHandle*>::iterator handle_map_iter;
        
        POS_CHECK_POINTER(handle);
        
        switch (status)
        {
        case kPOS_HandleStatus_Active:
            handle->status = kPOS_HandleStatus_Active;
            POS_DEBUG_C(
                "mark handle as \"Active\" status: client_addr(%p), server_addr(%p)",
                handle->client_addr, handle->server_addr
            );
            break;

        case kPOS_HandleStatus_Broken:
            handle->status = kPOS_HandleStatus_Broken;
            POS_DEBUG_C(
                "mark handle as \"Broken\" status: client_addr(%p), server_addr(%p)",
                handle->client_addr, handle->server_addr
            );
            break;

        case kPOS_HandleStatus_Create_Pending:
            handle->status = kPOS_HandleStatus_Create_Pending;
            POS_DEBUG_C(
                "mark handle as \"Create_Pending\" status: client_addr(%p), server_addr(%p)",
                handle->client_addr, handle->server_addr
            );
            break;

        case kPOS_HandleStatus_Delete_Pending:
            handle->status = kPOS_HandleStatus_Delete_Pending;

            // remove the handle from the address map
            handle_map_iter = _handle_address_map.find((uint64_t)(handle->client_addr));
            if (likely(handle_map_iter != _handle_address_map.end())) {
                _deleted_handle_address_map.insert({
                    /* client_addr */ (uint64_t)(handle->client_addr),
                    /* handle */ handle_map_iter->second
                });
                _handle_address_map.erase((uint64_t)(handle->client_addr));   
            }

            POS_DEBUG_C(
                "mark handle as \"Delete_Pending\" status: client_addr(%p), server_addr(%p)",
                handle->client_addr, handle->server_addr
            );
            break;

        case kPOS_HandleStatus_Deleted:
            handle->status = kPOS_HandleStatus_Deleted;

            // remove the handle from the address map (should be already deleted in the last case)
            handle_map_iter = _handle_address_map.find((uint64_t)(handle->client_addr));
            if (unlikely(handle_map_iter != _handle_address_map.end())) {
                POS_WARN_C_DETAIL("remove handle from address map when mark it as deleted, is this a bug?");
                _deleted_handle_address_map.insert({
                    /* client_addr */ (uint64_t)(handle->client_addr),
                    /* handle */ handle_map_iter->second
                });
                _handle_address_map.erase((uint64_t)(handle->client_addr));
            }

            POS_DEBUG_C(
                "mark handle as \"Deleted\" status: client_addr(%p), server_addr(%p)",
                handle->client_addr, handle->server_addr
            );
            break;
        
        default:
            POS_ERROR_C_DETAIL("unknown status %u", status);
        }

        return retval;
    }
    /* ===================== handle status management ======================== */


    /* ====================== handle restore support ========================= */
 public:
    /*!
     *  \brief  restore single handle from binary checkpoint file in this handle manager
     *  \param  ckpt_file   path to the checkpoint file
     *  \param  hid         handle index to be restored
     *  \param  handle      pointer to the handle to be restored
     *  \return POS_SUCCESS for successfully restore
     */
    pos_retval_t reallocate_single_handle(const std::string& ckpt_file, pos_u64id_t hid, T_POSHandle **handle);


    /*!
     *  \brief  allocate and restore handles for provision, for fast restore
     *  \param  amount  amount of handles for pooling
     *  \return POS_SUCCESS for successfully preserving
     */
    virtual pos_retval_t preserve_pooled_handles(uint64_t amount){
        pos_retval_t retval = POS_SUCCESS;
        uint64_t i=0;
        T_POSHandle *handle = nullptr;

        for(i=0; i<amount; i++){
            retval = this->__allocate_mocked_resource(&handle);
            POS_CHECK_POINTER(handle);
            if(unlikely(retval != POS_SUCCESS)){
                POS_WARN_C("failed to preserve %s handle for fast restoring", handle->get_resource_name().c_str());
                retval = POS_FAILED;
                goto exit;
            }

            retval = handle->__restore();
            if(unlikely(retval != POS_SUCCESS)){
                POS_WARN_C("failed to restore %s handle after allocation for fast restoring", handle->get_resource_name().c_str());
                retval = POS_FAILED;
                goto exit;
            }

            this->_pooled_handles.insert(handle);
        }

    exit:
        return retval;
    }


    /*!
     *  \brief  restore handle from pool
     *  \param  handle  the handle to be restored
     *  \return POS_SUCCESS for successfully restoring
     *          POS_FAILED for failed pooled restoring, should fall back to normal path
     */
    virtual pos_retval_t try_restore_from_pool(T_POSHandle* handle){
        pos_retval_t retval = POS_SUCCESS;
        POSHandle *preserved_handle;

        POS_CHECK_POINTER(handle);

        if(unlikely(this->_pooled_handles.size() == 0)){
            retval = POS_FAILED;
            goto exit;
        }

        preserved_handle = (*(this->_pooled_handles.begin()));
        POS_CHECK_POINTER(preserved_handle);
        this->_pooled_handles.erase(this->_pooled_handles.begin());

        //! \todo   is simply reasssign server-side address enough?
        handle->server_addr = preserved_handle->server_addr;
        handle->status = kPOS_HandleStatus_Active;

    exit:
        return retval;
    }


 protected:
    /*!
     *  \brief  pooled active handles for fast restore
     */
    std::set<T_POSHandle*> _pooled_handles;

    
    /*!
     *  \brief  reallocate the extra fields of handle with specific type in this handle manager
     *  \note   this function is called by reallocate_single_handle, and implemented by
     *          specific handle type
     *  \param  mapped          mmap handle of the file
     *  \param  ckpt_file_size  size of the checkpoint size (mmap area)
     *  \param  handle          pointer to the restored handle
     *  \return POS_SUCCESS for successfully restore
     */
    virtual pos_retval_t __reallocate_single_handle(void* mapped, uint64_t ckpt_file_size, T_POSHandle** handle){
        return POS_FAILED_NOT_IMPLEMENTED;
    }


    /*!
     *  \brief  restore mocked resource in this handle manager with specific metadata
     *  \param  handle                  pointer to the handle to be restored
     *  \param  id                      index of the handle
     *  \param  client_addr             client-side address of the handle
     *  \param  server_addr             server-side address of the handle
     *  \param  size                    size of thhe handle
     *  \param  parent_handles_waitlist  list of parent handles and their type
     *  \param  state_size              size of state behind this handle
     *  \return POS_SUCCESS for successfully restore
     */
    pos_retval_t __restore_mocked_resource(
        T_POSHandle** handle,
        uint64_t id,
        uint64_t client_addr,
        uint64_t server_addr,
        uint64_t size,
        std::vector<std::pair<pos_resource_typeid_t, pos_u64id_t>> parent_handles_waitlist,
        uint64_t state_size
    );
    /* ====================== handle restore support ========================= */
};


/*!
 *  \brief  allocate new mocked resource within the manager
 *  \param  handle              pointer to the mocked handle of the newly allocated resource
 *  \param  size                size of the newly allocated resource
 *  \param  related_handles     all related handles for helping allocate the mocked resource
 *                              (note: these related handles might be other types)
 *  \param  use_expected_addr   indicate whether to use expected client-side address
 *  \param  expected_addr       the expected mock addr to allocate the resource (optional)
 *  \return POS_FAILED_DRAIN for run out of virtual address space; 
 *          POS_SUCCESS for successfully allocation
 */
template<class T_POSHandle>
pos_retval_t POSHandleManager<T_POSHandle>::allocate_mocked_resource(
    T_POSHandle** handle,
    std::map</* type */ uint64_t, std::vector<POSHandle*>> related_handles,
    size_t size,
    bool use_expected_addr,
    uint64_t expected_addr,
    uint64_t state_size
){
    return this->__allocate_mocked_resource(handle, size, use_expected_addr, expected_addr, state_size);
}


/*!
 *  \brief  allocate new mocked resource within the manager
 *  \param  handle              pointer to the mocked handle of the newly allocated resource
 *  \param  size                size of the newly allocated resource
 *  \param  use_expected_addr   whether to use expected client address
 *  \param  expected_addr       the expected mock addr to allocate the resource (optional)
 *  \note   this function should be internally invoked by allocate_mocked_resource, which leave to children class to implement
 *  \return POS_FAILED_DRAIN for run out of virtual address space;
 *          POS_FAILED_ALREADY_EXIST for duplication failed;
 *          POS_SUCCESS for successfully allocation
 */
template<class T_POSHandle>
pos_retval_t POSHandleManager<T_POSHandle>::__allocate_mocked_resource(
    T_POSHandle** handle,
    size_t size,
    bool use_expected_addr,
    uint64_t expected_addr,
    uint64_t state_size
){
    pos_retval_t retval = POS_SUCCESS;

    POS_CHECK_POINTER(handle);

    if(this->_passthrough){
        *handle = new T_POSHandle(
            /* size_ */ size,
            /* hm */ this,
            /* id_ */ this->_handles.size(),
            /* state_size_ */ state_size
        );
        POS_CHECK_POINTER(*handle);
    } else {
        // if one want to create on an expected address, we directly move the pointer forward
        if(unlikely(use_expected_addr == true)){
            this->_base_ptr = expected_addr;
        }

        // make sure the resource to be allocated won't exceed the range
        if(unlikely(kPOS_ResourceEndAddr - this->_base_ptr < size)){
            POS_WARN_C(
                "failed to allocate new resource, exceed range: request %lu bytes, yet %lu bytes left",
                size, kPOS_ResourceEndAddr - _base_ptr
            );
            retval = POS_FAILED_DRAIN;
            *handle = nullptr;
            goto exit;
        }

        *handle = new T_POSHandle(
            /* client_addr */ (void*)(this->_base_ptr),
            /* size_ */ size,
            /* hm */ this,
            /* id_ */ this->_handles.size(),
            /* state_size_ */ state_size
        );
        POS_CHECK_POINTER(*handle);

        // record client-side address to the map
        retval = record_handle_address((void*)(this->_base_ptr), *handle);
        if(unlikely(POS_SUCCESS != retval)){
            goto exit;
        }

        this->_base_ptr += size;
    }

    POS_DEBUG_C(
        "allocate new resource: _base_ptr(%lu), size(%lu), POSHandle.resource_type_id(%u)",
        this->_base_ptr, size, (*handle)->resource_type_id
    );

    this->_handles.push_back(*handle);

  exit:
    return retval;
}


/*!
 *  \brief  obtain a handle by given client-side address
 *  \param  client_addr the given client-side address
 *  \param  handle      the resulted handle
 *  \param  offset      pointer to store the offset of the given address from the base address
 *  \return POS_FAILED_NOT_EXIST for no corresponding handle exists;
 *          POS_SUCCESS for successfully founded
 */
template<class T_POSHandle>
pos_retval_t POSHandleManager<T_POSHandle>::get_handle_by_client_addr(void* client_addr, T_POSHandle** handle, uint64_t* offset){
    return __get_handle_by_client_addr(client_addr, handle, offset);
}


/*!
 *  \brief  obtain a handle by given client-side address
 *  \param  client_addr the given client-side address
 *  \param  handle      the resulted handle
 *  \param  offset      pointer to store the offset of the given address from the base address
 *  \note   this function should be internally invoked by get_handle_by_client_addr, which leave to children class to implement
 *  \return POS_FAILED_NOT_EXIST for no corresponding handle exists;
 *          POS_SUCCESS for successfully founded
 */
template<class T_POSHandle>
pos_retval_t POSHandleManager<T_POSHandle>::__get_handle_by_client_addr(void* client_addr, T_POSHandle** handle, uint64_t* offset){
    pos_retval_t ret = POS_SUCCESS;
    T_POSHandle *handle_ptr;
    uint64_t i;
    uint64_t client_addr_u64 = (uint64_t)(client_addr);

    typename std::map<uint64_t, T_POSHandle*>::iterator handle_map_iter;

    POS_CHECK_POINTER(handle);
    
    /*!
     *  \note   direct case: the given address is exactly the base address
     */
    if(unlikely(this->_handle_address_map.count(client_addr_u64) > 0)){
        *handle = this->_handle_address_map[client_addr_u64];

        /*!
         *  \note   those handle that has been deleted (i.e., kPOS_HandleStatus_Deleted) and 
         *          are going to be deleted (i.e., kPOS_HandleStatus_Delete_Pending) must be
         *          not in the map! 
         */
        POS_ASSERT(
            (*handle)->status != kPOS_HandleStatus_Deleted 
            && (*handle)->status != kPOS_HandleStatus_Delete_Pending
        );

        if(unlikely(offset != nullptr)){
            *offset = 0;
        }
        goto exit;
    }
    
    /*!
     *  \brief  indirect case: the given address is beyond the base address
     *  \note   most of query will fall back to this part
     */
    handle_map_iter = this->_handle_address_map.lower_bound(client_addr_u64);
    if(handle_map_iter != this->_handle_address_map.begin()){
        // get the first handle less than the given address
        handle_map_iter--;
        handle_ptr = handle_map_iter->second;

        POS_ASSERT(
            handle_ptr->status != kPOS_HandleStatus_Deleted && handle_ptr->status != kPOS_HandleStatus_Delete_Pending
        );

        if(likely(
            (uint64_t)(handle_ptr->client_addr) <= client_addr_u64 
            && client_addr_u64 < (uint64_t)(handle_ptr->client_addr) + handle_ptr->size
        )){
            *handle = handle_ptr;

            if(offset != nullptr){
                *offset = client_addr_u64 - (uint64_t)(handle_ptr->client_addr);
            }

            goto exit;
        }
    }

// not_found:
    *handle = nullptr;
    ret = POS_FAILED_NOT_EXIST;

exit:
    return ret;
}


template<class T_POSHandle>
pos_retval_t POSHandleManager<T_POSHandle>::__restore_mocked_resource(
    T_POSHandle** handle,
    uint64_t id,
    uint64_t client_addr,
    uint64_t server_addr,
    uint64_t size,
    std::vector<std::pair<pos_resource_typeid_t, pos_u64id_t>> parent_handles_waitlist,
    uint64_t state_size
){
    pos_retval_t retval = POS_SUCCESS;
    POS_CHECK_POINTER(handle);

    // resize vector on-demand
    if(this->_handles.size() <= id){
        this->_handles.resize(id+1, nullptr);
    }

    /*!
     *  \brief  check conflict on handle index
     *  \note   wo tolerate conflict handle index, as some initialized handles might be created while handle manager be initialized
     */
    // if(unlikely(this->_handles[id] != nullptr)){
    //     POS_WARN_C("failed to allocate mocked resource, conflict handle index: id(%lu)", id);
    //     retval = POS_FAILED_ALREADY_EXIST;
    //     goto exit;
    // }
    if(unlikely(this->_handles[id] != nullptr)){
        *handle = this->_handles[id];
        goto exit;
    }

    /*!
     *  \brief  check conflict on handle address
     *  \note   wo tolerate conflict client address, as some handle types are designed in this way (e.g., CUfunction)
     */
    // if(unlikely(this->_handle_address_map.count(client_addr) > 0)){
    //     POS_WARN_C("failed to allocate mocked resource, conflict client address: id(%lu), client_addr(%p)", id, client_addr);
    //     retval = POS_FAILED_ALREADY_EXIST;
    //     goto exit;
    // }

    if(this->_passthrough){
        *handle = new T_POSHandle(
            /* size_ */ size,
            /* hm */ this,
            /* id_ */ id,
            /* state_size_ */ state_size
        );
        POS_CHECK_POINTER(*handle);
        (*handle)->set_passthrough_addr((void*)(server_addr), (*handle));
    } else {
        // make sure the resource to be allocated won't exceed the range
        if(unlikely(kPOS_ResourceEndAddr - client_addr < size)){
            POS_WARN_C(
                "failed to allocate mocked resource, client addr exceed range: request %lu bytes, yet %lu bytes left",
                size, kPOS_ResourceEndAddr - _base_ptr
            );
            retval = POS_FAILED_DRAIN;
            *handle = nullptr;
            goto exit;
        }

        // push forward base_ptr
        if(unlikely(client_addr + size > this->_base_ptr)){
            this->_base_ptr = client_addr + size;
        }

        *handle = new T_POSHandle(
            /* client_addr */ (void*)(client_addr),
            /* size_ */ size,
            /* hm */ this,
            /* id_ */ id,
            /* state_size_ */ state_size
        );
        POS_CHECK_POINTER(*handle);
        (*handle)->set_server_addr((void*)(server_addr));
    }

    // record client-side address to the map
    retval = this->record_handle_address((void*)(client_addr), *handle);
    if(unlikely(POS_SUCCESS != retval)){
        goto exit;
    }

    // mark status
    (*handle)->mark_status(kPOS_HandleStatus_Broken);
    (*handle)->mark_state_status(kPOS_HandleStatus_StateMiss);

    // record parent handles
    (*handle)->parent_handles_waitlist = parent_handles_waitlist;

    POS_DEBUG_C("allocated mocked resource: client_addr(%p), size(%lu)", client_addr, size);
    this->_handles[id] = (*handle);

exit:
    return retval;
}


template<class T_POSHandle>
pos_retval_t POSHandleManager<T_POSHandle>::reallocate_single_handle(const std::string& ckpt_file, pos_u64id_t hid, T_POSHandle **handle){
    pos_retval_t retval = POS_SUCCESS;
    int fd;
    struct stat sb;
    void* mapped = nullptr;

    POS_CHECK_POINTER(handle);
    *handle = nullptr;

    POS_ASSERT(ckpt_file.size() > 0);
    
    if (unlikely(!std::filesystem::exists(ckpt_file))) {
        POS_WARN_C("failed to restore handles, ckpt file not exist: %s", ckpt_file.c_str())
        retval = POS_FAILED_INVALID_INPUT;
        goto exit;
    }

    // open the file
    fd = open(ckpt_file.c_str(), O_RDONLY);
    if(unlikely(fd < 0)) {
        POS_WARN_C("failed to restore handle, failed to open ckpt file: ckpt_file(%s)", ckpt_file.c_str());
        retval = POS_FAILED;
        goto exit;
    }

    // obtain the metadata of the file
    if(unlikely(fstat(fd, &sb) == -1)) {
        POS_WARN_C("failed to restore handle, failed obtain medadata of ckpt file: ckpt_file(%s)", ckpt_file.c_str());
        retval = POS_FAILED;
        goto exit;
    }

    // mmap the file
    mapped = mmap(nullptr, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0);
    if(unlikely(mapped == MAP_FAILED)) {
        POS_WARN_C("failed to restore handle, failed mmap the ckpt file: ckpt_file(%s)", ckpt_file.c_str());
        retval = POS_FAILED;
        goto exit;
    }

    // deserialize and reallocate new handle from the ckpt file
    retval = this->__reallocate_single_handle(mapped, sb.st_size, handle);
    if(unlikely(retval != POS_SUCCESS)){
        POS_WARN_C("failed to restore handle, restored with specific type: ckpt_file(%s), retval(%u)", ckpt_file.c_str(), retval);
        goto exit;
    }
    POS_CHECK_POINTER(*handle);

    // record mmaped area for later reload state
    if((*handle)->state_size > 0){
        (*handle)->restore_binary_mapped = mapped;
        (*handle)->restore_binary_mapped_size = sb.st_size;
    }

exit:
    if(unlikely(retval != POS_SUCCESS) || (*handle != nullptr && (*handle)->state_size == 0)){ 
        if(mapped != nullptr && mapped != MAP_FAILED){ munmap(mapped, sb.st_size); }
    }
    close(fd);
    return retval;
}
